agentmux_srv\backend\process_tracker/
registry.rs1use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, OnceLock};
19
20use parking_lot::Mutex;
21
22use super::{new_tracker, TrackedProcess, TrackerHandle, TrackingConfidence};
23use crate::backend::wps;
24
25static GLOBAL: OnceLock<Arc<AgentProcessRegistry>> = OnceLock::new();
31
32pub fn set_global(registry: Arc<AgentProcessRegistry>) {
33 let _ = GLOBAL.set(registry);
34}
35
36pub fn global() -> Option<Arc<AgentProcessRegistry>> {
37 GLOBAL.get().cloned()
38}
39
40pub struct AgentProcessRegistry {
41 inner: Mutex<HashMap<String, RegistryEntry>>,
42 broker: Option<Arc<wps::Broker>>,
43}
44
45struct RegistryEntry {
46 tracker: Arc<dyn TrackerHandle>,
47 last_pids: HashSet<u32>,
51}
52
53impl AgentProcessRegistry {
54 pub fn new(broker: Option<Arc<wps::Broker>>) -> Self {
55 Self {
56 inner: Mutex::new(HashMap::new()),
57 broker,
58 }
59 }
60
61 pub fn ensure_tracker(&self, block_id: &str) -> Arc<dyn TrackerHandle> {
65 let mut map = self.inner.lock();
66 if let Some(entry) = map.get(block_id) {
67 return entry.tracker.clone();
68 }
69 let tracker = new_tracker(block_id);
70 map.insert(
71 block_id.to_string(),
72 RegistryEntry {
73 tracker: tracker.clone(),
74 last_pids: HashSet::new(),
75 },
76 );
77 tracing::info!(
78 block_id = %block_id,
79 confidence = ?tracker.confidence(),
80 "[process-tracker] registered tracker"
81 );
82 tracker
83 }
84
85 pub fn remove(&self, block_id: &str) {
89 let mut map = self.inner.lock();
90 if map.remove(block_id).is_some() {
91 tracing::info!(block_id = %block_id, "[process-tracker] dropped tracker on pane close");
92 }
93 }
94
95 pub fn list_block(&self, block_id: &str) -> Vec<TrackedProcess> {
97 self.inner
98 .lock()
99 .get(block_id)
100 .map(|e| e.tracker.list_members())
101 .unwrap_or_default()
102 }
103
104 pub fn confidence_of(&self, block_id: &str) -> TrackingConfidence {
107 self.inner
108 .lock()
109 .get(block_id)
110 .map(|e| e.tracker.confidence())
111 .unwrap_or(TrackingConfidence::None)
112 }
113
114 pub fn list_all_blocks(&self) -> Vec<String> {
116 self.inner.lock().keys().cloned().collect()
117 }
118
119 pub fn kill_tree(&self, block_id: &str) -> bool {
125 let tracker = self.inner.lock().get(block_id).map(|e| e.tracker.clone());
126 match tracker {
127 Some(t) => {
128 t.kill_tree();
129 true
130 }
131 None => false,
132 }
133 }
134
135 pub fn kill_pid(&self, block_id: &str, pid: u32) -> bool {
138 let tracker = self.inner.lock().get(block_id).map(|e| e.tracker.clone());
139 match tracker {
140 Some(t) => t.kill_pid(pid),
141 None => false,
142 }
143 }
144
145 pub fn poll_and_emit(&self) {
151 let mut map = self.inner.lock();
152 for (block_id, entry) in map.iter_mut() {
153 let current_members = entry.tracker.list_members();
154 let current_pids: HashSet<u32> = current_members.iter().map(|p| p.pid).collect();
155
156 for added_pid in current_pids.difference(&entry.last_pids) {
157 if let Some(p) = current_members.iter().find(|p| p.pid == *added_pid) {
158 self.emit(
159 "agent:process-added",
160 block_id,
161 serde_json::json!({ "block_id": block_id, "process": p }),
162 );
163 }
164 }
165 for removed_pid in entry.last_pids.difference(¤t_pids) {
166 self.emit(
167 "agent:process-exited",
168 block_id,
169 serde_json::json!({ "block_id": block_id, "pid": removed_pid }),
170 );
171 }
172
173 entry.last_pids = current_pids;
174 }
175 }
176
177 fn emit(&self, event_name: &str, block_id: &str, data: serde_json::Value) {
178 let Some(ref broker) = self.broker else { return };
179 broker.publish(wps::WaveEvent {
180 event: event_name.to_string(),
181 scopes: vec![format!("block:{}", block_id)],
182 sender: String::new(),
183 persist: 0,
184 data: Some(data),
185 });
186 }
187}
188
189pub fn spawn_poller(registry: Arc<AgentProcessRegistry>) {
194 tokio::spawn(async move {
195 let mut interval = tokio::time::interval(std::time::Duration::from_secs(2));
196 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
197 loop {
198 interval.tick().await;
199 registry.poll_and_emit();
200 }
201 });
202}